#pragma once
/*
 * session which runs in the same thread.  
 */
#include <functional>
#include <vector>
#include <memory>
#include <map>
#include <mutex>
#include <atomic>
#include "anet.hpp"
#include "connection.hpp"
#include "main_service.h"
#include "rpc/rpc_handle.hpp"
#include "default_interface.hpp"
#include "allocator.hpp"

namespace anet {
	namespace tcp {
		class IReleaser;
		template <typename proxySession>
		class CSharePtrSession : public ISession,
			public std::enable_shared_from_this<CSharePtrSession<proxySession>> {
		public:
			CSharePtrSession() {
				initProxySession(proxySession::Create());
			}

			explicit CSharePtrSession(proxySession* pSession) {
				initProxySession(pSession);
			}

			virtual ~CSharePtrSession() = default;

			virtual void onRecv(const char* msg, int len) override {
				auto self = this->shared_from_this();
				// std::vector<char> vecMsg(msg, msg + len);
				anet::utils::PoolVector<char> vecMsg(msg, msg + len);
				CMainService::instance().post([self, this, vecMsg = std::move(vecMsg)]{
					this->OnMessage(vecMsg.data(), vecMsg.size());
					});
			}

			virtual void onConnected(connSharePtr conn) override {
				m_closed.store(false);
				m_conn = conn;
				auto self = this->shared_from_this();
				CMainService::instance().post([self, this]() {
					this->OnConnected();
				});
			}

			virtual void onTerminate() override {
				m_closed.store(true);
				auto self = this->shared_from_this();
				CMainService::instance().post([self, this]() {
					this->OnTerminate();
				});
			}

			virtual void release() override {
				auto self = this->shared_from_this();
				CMainService::instance().post([self, this]() {
					this->OnRelease();
					this->Release();
				});
			}

			void setReleaser(IReleaser* releaser) {
				if (releaser == nullptr) {
					throw std::invalid_argument("releaser cannot be null");
					return;
				}
				m_releaser = releaser;
			}

			unsigned int getId() const {
				return m_id;
			}

			void setId(unsigned int id) {
				m_id = id;
			}

			void Release() {
				if (m_releaser != nullptr) {
					auto self = this->shared_from_this();
					m_releaser->releaseSession((void*)&self);
				} else {
					LogCrit("Cannot find releaser object");
				}
			}

			void OnConnected() {
				m_proxySession->OnConnected();
			}

			void OnMessage(const char* msg, int len) {
				m_proxySession->OnMessage(msg, len);
			}

			void OnTerminate() {
				m_proxySession->OnTerminate();
			}

			void OnRelease() {
				m_proxySession->OnRelease();
			}

			void Send(const std::string& msg) {
				this->Send(msg.c_str(), int(msg.size()));
			}

			void Send(const char* msg, int len) {
				if (msg == nullptr || len == 0) {
					return;
				}
				if (IsConnected()) {
					m_conn->Send(msg, len);
				}
			}

			void Send(unsigned short msgId, const char* msg, int len) {
				anet::utils::PoolVector<char> vec;
				int size = buildProto(msgId, msg, len, vec);
				this->Send(vec.data(), size);
			}

			template <typename... Args>
			void remote_call(const std::string& method, Args&&... args) {
				anet::rpc_codec::rpc_stream stream;
				anet::rpc_codec::pack_remote_call(stream, method, std::forward<Args>(args)...);
				this->Send(stream.c_str(), static_cast<int>(stream.buf().size()));
			}

			bool IsConnected() const {
				return !m_closed.load();
			}

			void Close() {
				if (IsConnected()) {
					m_conn->close();
				}
			}

			const std::string getRemoteIP() const {
				return m_conn->getRemoteAddr();
			}

			unsigned short getRemotePort() const {
				return m_conn->getRemotePort();
			}

			proxySession* getProxySession() const {
				return m_proxySession;
			}

			int buildProto(unsigned short msgId, const char* msg, int len, 
				anet::utils::PoolVector<char>& vecBuf) {
				vecBuf.resize(sizeof(SCommonHead) + sizeof(unsigned short) + len);
				char* pBuf = vecBuf.data();

				SCommonHead& head = *reinterpret_cast<SCommonHead*>(pBuf);
				head.len = uint32(htonl(uint32(len) + sizeof(msgId)));

				*reinterpret_cast<uint16*>(pBuf + gProto_head_size) = htons(msgId);
				std::memcpy(pBuf + gProto_head_size + sizeof(msgId), msg, len);
				return static_cast<int>(gProto_head_size + sizeof(msgId) + len);
			}

		private:
			void initProxySession(proxySession* pSession) {
				if (pSession == nullptr) {
					throw std::runtime_error("proxySession cannot be null");
					return;
				}
				m_proxySession = pSession;
				m_proxySession->SetSession(this);
			}

		private:
			connSharePtr m_conn{ nullptr };
			unsigned int m_id{ 0 };
			std::atomic_bool m_closed{ true };
			IReleaser* m_releaser{ nullptr };
			proxySession* m_proxySession{ nullptr };
		};

		class IReleaser {
		public:
			virtual ~IReleaser() = default;
			virtual void releaseSession(void* pSession) = 0;
		};

		template <typename proxySession>
		class CSharePtrSessionFactory : public ISessionFactory, public IReleaser {
		public:
			CSharePtrSessionFactory() = default;
			virtual ~CSharePtrSessionFactory() = default;
			CSharePtrSessionFactory(const CSharePtrSessionFactory&) = delete;
			CSharePtrSessionFactory& operator=(const CSharePtrSessionFactory&) = delete;

			using sharePtrSession = CSharePtrSession<proxySession>;
			using sessionSharePtr = std::shared_ptr<sharePtrSession>;
			using Id2SessionPtr = std::map<unsigned int, sessionSharePtr>;

			virtual ISession* createSession() override {
				return _createSession(nullptr);
			}

			virtual void releaseSession(void* session) override {
				if (session == nullptr) {
					return;
				}

				auto& realSession = *reinterpret_cast<sessionSharePtr*>(session);
				const auto id = realSession->getId();

				{
					std::lock_guard<std::mutex> lock(m_mutex);
					m_sessions.erase(id);
				}

				LogADebug("%d is removed now, there are %d sessions in factory", id, m_sessions.size());
			}

			ISession* createSession(proxySession* pSession) {
				return _createSession(pSession);
			}

			std::pair<bool, sessionSharePtr> findSession(unsigned int id) const {
				std::lock_guard<std::mutex> lock(m_mutex);
				auto it = m_sessions.find(id);
				if (it != m_sessions.end()) {
					return { true, it->second };
				} else {
					return { false, sessionSharePtr{} };
				}
			}

		private:
			ISession* _createSession(proxySession* pSession) {
				std::lock_guard<std::mutex> lock(m_mutex);

				const unsigned int sessionId = m_nextId++;
				sessionSharePtr sessSharePtr = (pSession != nullptr) ?
					std::make_shared<sharePtrSession>(pSession) :
					std::make_shared<sharePtrSession>();

				sessSharePtr->setId(sessionId);
				sessSharePtr->setReleaser(this);

				m_sessions[sessionId] = sessSharePtr;
				return sessSharePtr.get();
			}

		private:
			mutable std::mutex m_mutex;
			Id2SessionPtr m_sessions;
			unsigned int m_nextId{ 1 };
		};
	}
}